import pyspark
import pandas as pd
import seaborn as sns
import numpy as np
from pyspark.sql import Row
import plotly.express as px
from datetime import datetime
import matplotlib.pyplot as plt
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.functions import expr
from pyspark.sql.functions import split
from pyspark.sql.types import StringType
from pyspark.sql.functions import col, column
from pyspark.sql.types import StructField, StructType, StringType, LongType, IntegerType, FloatType
crime=SparkSession.builder.master("local").appName("CrimeDataAnalysis").getOrCreate()
crime.sparkContext.setCheckpointDir("~/scratch")
Data22 = crime.read.csv("/storage/home/ssn5137/Project/Crimes_-_2022_20231016.csv", header=True, inferSchema=True)
Data22.printSchema()
root |-- ID: integer (nullable = true) |-- Case Number: string (nullable = true) |-- Date: string (nullable = true) |-- Block: string (nullable = true) |-- IUCR: string (nullable = true) |-- Primary Type: string (nullable = true) |-- Description: string (nullable = true) |-- Location Description: string (nullable = true) |-- Arrest: boolean (nullable = true) |-- Domestic: boolean (nullable = true) |-- Beat: integer (nullable = true) |-- District: integer (nullable = true) |-- Ward: integer (nullable = true) |-- Community Area: integer (nullable = true) |-- FBI Code: string (nullable = true) |-- X Coordinate: integer (nullable = true) |-- Y Coordinate: integer (nullable = true) |-- Year: integer (nullable = true) |-- Updated On: string (nullable = true) |-- Latitude: double (nullable = true) |-- Longitude: double (nullable = true) |-- Location: string (nullable = true)
Data23 = crime.read.csv("/storage/home/ssn5137/Project/Crimes_-_2023_20231016.csv", inferSchema = True, header = True)
Data22.printSchema()
root |-- ID: integer (nullable = true) |-- Case Number: string (nullable = true) |-- Date: string (nullable = true) |-- Block: string (nullable = true) |-- IUCR: string (nullable = true) |-- Primary Type: string (nullable = true) |-- Description: string (nullable = true) |-- Location Description: string (nullable = true) |-- Arrest: boolean (nullable = true) |-- Domestic: boolean (nullable = true) |-- Beat: integer (nullable = true) |-- District: integer (nullable = true) |-- Ward: integer (nullable = true) |-- Community Area: integer (nullable = true) |-- FBI Code: string (nullable = true) |-- X Coordinate: integer (nullable = true) |-- Y Coordinate: integer (nullable = true) |-- Year: integer (nullable = true) |-- Updated On: string (nullable = true) |-- Latitude: double (nullable = true) |-- Longitude: double (nullable = true) |-- Location: string (nullable = true)
Data22_23 = Data22.union(Data23)
print(f"Total Entries in 2022: {Data22.count()}")
print(f"Total Entries in 2023: {Data23.count()}")
print(f"Total Entries in 2022 to 2023: {Data22.count() + Data23.count()}")
Total Entries in 2022: 238990 Total Entries in 2023: 199486 Total Entries in 2022 to 2023: 438476
df_clean = Data22_23.select("Date", "Primary Type", "Description", "Location Description","Community Area", "Year", "Arrest")
df_clean.printSchema()
root |-- Date: string (nullable = true) |-- Primary Type: string (nullable = true) |-- Description: string (nullable = true) |-- Location Description: string (nullable = true) |-- Community Area: integer (nullable = true) |-- Year: integer (nullable = true) |-- Arrest: boolean (nullable = true)
df_clean = Data22_23.filter(Data22_23['Date'].isNotNull() & \
Data22_23['Primary Type'].isNotNull() & \
Data22_23['Description'].isNotNull() & \
Data22_23['Location Description'].isNotNull() & \
Data22_23['Community Area'].isNotNull() & \
Data22_23['Year'].isNotNull() & \
Data22_23['Arrest'].isNotNull())
print(f"Total Entries after cleaning in 2022 to 2023: {df_clean.count()}")
Total Entries after cleaning in 2022 to 2023: 436722
mapped_primary_type = df_clean.rdd.map(lambda row: (row["Primary Type"], 1))
reduced_primary_type = mapped_primary_type.reduceByKey(lambda a, b: a + b)
sorted_primary_type = reduced_primary_type.sortBy(lambda x: x[1], ascending=False)
primary_type_counts_sorted = sorted_primary_type.collect()
primary = [] #empty list for visualization
counts = [] #empty list for visualization
for primary_type, count in primary_type_counts_sorted:
print(f"{primary_type}: {count}")
primary.append(primary_type) #the first column (primary type)
counts.append(int(count)) #the second column (counts)
THEFT: 97899 BATTERY: 75067 CRIMINAL DAMAGE: 50345 MOTOR VEHICLE THEFT: 44103 ASSAULT: 38218 DECEPTIVE PRACTICE: 27110 OTHER OFFENSE: 26380 ROBBERY: 17071 WEAPONS VIOLATION: 15655 BURGLARY: 13209 NARCOTICS: 8563 CRIMINAL TRESPASS: 7819 OFFENSE INVOLVING CHILDREN: 3250 CRIMINAL SEXUAL ASSAULT: 2792 SEX OFFENSE: 2258 PUBLIC PEACE VIOLATION: 1373 HOMICIDE: 1210 INTERFERENCE WITH PUBLIC OFFICER: 844 STALKING: 836 ARSON: 812 PROSTITUTION: 477 INTIMIDATION: 374 LIQUOR LAW VIOLATION: 346 CONCEALED CARRY LICENSE VIOLATION: 337 KIDNAPPING: 232 OBSCENITY: 75 HUMAN TRAFFICKING: 23 GAMBLING: 20 OTHER NARCOTIC VIOLATION: 9 PUBLIC INDECENCY: 8 NON-CRIMINAL: 7
plt.figure(figsize=(11, 7))
plt.barh(primary, counts)
plt.ylabel('Crime Type')
plt.xlabel('No. of Crimes')
plt.title('Crime')
plt.show()
threshold = 2
total_count = sum(counts)
new_primary = [p for p, c in zip(primary, counts) if (c / total_count) * 100 > threshold] + ['Other']
new_counts = [c for c in counts if (c / total_count) * 100 > threshold] + [sum(c for c in counts if (c / total_count) * 100 <= threshold)]
colors = plt.cm.tab20.colors
if 'Other' in new_primary:
other_index = new_primary.index('Other')
colors = list(colors)
colors[other_index] = 'Yellow'
# Create the pie chart
plt.figure(figsize=(9, 9))
plt.pie(new_counts, labels=new_primary, autopct='%1.1f%%', colors=colors)
plt.title('Crime Type Distribution')
plt.axis('equal')
plt.show()
# Map-Reduce according to respective charts
mapped_data = df_clean.rdd \
.map(lambda row: ((row['Primary Type'], row['Description'], row['Location Description']), 1))
reduced_data = mapped_data.reduceByKey(lambda a, b: a + b)
sorted_data = reduced_data.sortBy(lambda x: x[1], ascending=False)
sorted_results = sorted_data.collect()
# Lists for visualization
primary_types = []
descriptions = []
location_descriptions = []
counts = []
# Printing the top 10 categories
for category, count in sorted_results[:10]: # Limit to top 10
primary_type, description, location_description = category
print(f"{primary_type}, {description}, {location_description}: {count}")
primary_types.append(primary_type)
descriptions.append(description)
location_descriptions.append(location_description)
counts.append(int(count))
MOTOR VEHICLE THEFT, AUTOMOBILE, STREET: 24970 CRIMINAL DAMAGE, TO VEHICLE, STREET: 18432 BATTERY, DOMESTIC BATTERY SIMPLE, APARTMENT: 17352 THEFT, OVER $500, STREET: 15674 THEFT, $500 AND UNDER, STREET: 11300 BATTERY, DOMESTIC BATTERY SIMPLE, RESIDENCE: 8434 CRIMINAL DAMAGE, TO PROPERTY, APARTMENT: 8205 ASSAULT, SIMPLE, APARTMENT: 6451 THEFT, RETAIL THEFT, SMALL RETAIL STORE: 6391 CRIMINAL DAMAGE, TO PROPERTY, RESIDENCE: 5657
# Convert the results to a Pandas DataFrame for easier plotting
result_df = pd.DataFrame(list(zip(primary_types, descriptions, location_descriptions, counts)),
columns=['Primary Type', 'Description', 'Location Description', 'Count'])
# Create an interactive tree map with different colors for each primary type
fig = px.treemap(result_df,
path=['Primary Type', 'Description', 'Location Description'],
values='Count',
color='Primary Type', # Color by primary type
title='Tree Map of Crime Categories',
color_continuous_scale='Blues')
# Show the figure
fig.show()
# extracting hour and AM/PM from the 'Date' column
def extract_hour_am_pm(date_str):
try:
parts = date_str.strip().split(" ")
if len(parts) >= 3:
time_part = parts[1]
am_pm = parts[2]
hour = time_part.split(":")[0]
return f"{hour} {am_pm}"
else:
return None
except:
return None
extract_hour_am_pm_udf = udf(extract_hour_am_pm, StringType())
df_clean = df_clean.withColumn('Hour_AMPM', extract_hour_am_pm_udf(df_clean['Date']))
hourly_counts = df_clean.groupBy('Hour_AMPM').count().orderBy('Hour_AMPM')
hourly_counts_list = hourly_counts.collect()
for row in hourly_counts_list:
print(f"Hour: {row['Hour_AMPM']}, Number of Crimes: {row['count']}")
Hour: 01 AM, Number of Crimes: 14427 Hour: 01 PM, Number of Crimes: 19172 Hour: 02 AM, Number of Crimes: 12786 Hour: 02 PM, Number of Crimes: 20090 Hour: 03 AM, Number of Crimes: 10824 Hour: 03 PM, Number of Crimes: 23286 Hour: 04 AM, Number of Crimes: 8854 Hour: 04 PM, Number of Crimes: 22621 Hour: 05 AM, Number of Crimes: 7629 Hour: 05 PM, Number of Crimes: 23389 Hour: 06 AM, Number of Crimes: 8123 Hour: 06 PM, Number of Crimes: 22989 Hour: 07 AM, Number of Crimes: 11027 Hour: 07 PM, Number of Crimes: 22358 Hour: 08 AM, Number of Crimes: 14769 Hour: 08 PM, Number of Crimes: 21838 Hour: 09 AM, Number of Crimes: 18208 Hour: 09 PM, Number of Crimes: 20468 Hour: 10 AM, Number of Crimes: 18350 Hour: 10 PM, Number of Crimes: 19657 Hour: 11 AM, Number of Crimes: 18867 Hour: 11 PM, Number of Crimes: 17811 Hour: 12 AM, Number of Crimes: 34470 Hour: 12 PM, Number of Crimes: 24709
hours = [row['Hour_AMPM'] for row in hourly_counts_list]
counts = [row['count'] for row in hourly_counts_list]
# histogram
plt.figure(figsize=(10, 6))
plt.bar(hours, counts, color='green')
#titles and labels
plt.title('Crime Counts by Hour')
plt.xlabel('Hour')
plt.ylabel('Count')
plt.xticks(rotation=45)
plt.show()
mapped_arrests = df_clean.rdd.map(lambda row: (row["Primary Type"], 1 if row["Arrest"] else 0))
reduced_arrests = mapped_arrests.reduceByKey(lambda a, b: a + b)
arrest_counts = reduced_arrests.collect()
# Sort arrest counts in descending order
N = 10
sorted_arrest_counts = sorted(arrest_counts, key=lambda x: x[1], reverse=True)[:N]
# Group other counts into an "Other" category
other_count = sum([count for crime_type, count in arrest_counts if crime_type not in [x[0] for x in sorted_arrest_counts]])
if other_count > 0:
sorted_arrest_counts.append(('Other', other_count))
crime_types, counts = zip(*sorted_arrest_counts)
colors = plt.cm.viridis(np.linspace(0, 1, N))
colors = list(colors)
colors.append('grey')
# Plotting pie chart
plt.figure(figsize=(12, 12))
wedges, texts, autotexts = plt.pie(counts, colors=colors, autopct='%1.1f%%', startangle=140, textprops=dict(color="w"))
plt.title('Percentage of Arrests by Crime Type')
plt.legend(wedges, crime_types, title="Crime Types", loc="center left", bbox_to_anchor=(1, 0, 0.5, 1))
plt.setp(autotexts, size=10, weight="bold")
plt.axis('equal')
plt.show()
mapped_primary_type_area = df_clean.rdd.map(lambda row: ((row["Primary Type"], row["Community Area"]), 1))
reduced_primary_type_area = mapped_primary_type_area.reduceByKey(lambda a, b: a + b)
sorted_primary_type_area = reduced_primary_type_area.sortBy(lambda x: x[1], ascending=False)
primary_type_area_counts_sorted = sorted_primary_type_area.collect()
# Printing top 10 results
for (primary_type, community_area), count in primary_type_area_counts_sorted[:10]:
print(f"Crime Type: {primary_type}, Community Area: {community_area}, Number of Crimes: {count}")
Crime Type: THEFT, Community Area: 8, Number of Crimes: 7548 Crime Type: THEFT, Community Area: 32, Number of Crimes: 6086 Crime Type: THEFT, Community Area: 28, Number of Crimes: 5752 Crime Type: THEFT, Community Area: 24, Number of Crimes: 4568 Crime Type: BATTERY, Community Area: 25, Number of Crimes: 4417 Crime Type: THEFT, Community Area: 6, Number of Crimes: 4205 Crime Type: THEFT, Community Area: 25, Number of Crimes: 3200 Crime Type: BATTERY, Community Area: 43, Number of Crimes: 3071 Crime Type: THEFT, Community Area: 7, Number of Crimes: 2775 Crime Type: BATTERY, Community Area: 29, Number of Crimes: 2746
primary_type_area_counts_sorted = sorted_primary_type_area.collect()
# Creating a dataframe with the collected data
data = {'Crime Type': [], 'Community Area': [], 'Count': []}
for (primary_type, community_area), count in primary_type_area_counts_sorted:
data['Crime Type'].append(primary_type)
data['Community Area'].append(community_area)
data['Count'].append(count)
df_heatmap = pd.DataFrame(data)
# Heatmap
heatmap_data = df_heatmap.pivot("Community Area", "Crime Type", "Count")
# Graph
plt.figure(figsize=(25, 20))
sns.heatmap(heatmap_data, annot=False, cmap="Spectral", cbar_kws={'label': 'Number of Crimes'})
plt.title('Crime Counts by Community Area and Crime Type')
plt.xticks(rotation=90)
plt.yticks(rotation=0)
plt.tight_layout()
plt.show()
crime.stop()